package com.iotdb.zjc.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.assertj.core.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
 * @Author: zjc
 * @ClassName KafkaConsumer
 * @Description TODO
 * @date 2021/11/19 11:32
 * @Version 1.0
 */
@Service
public class KafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);

    /**
     * 属性解释：
     *
     * ① id：消费者ID；
     *
     * ② groupId：消费组ID；
     *
     * ③ topics：监听的topic，可监听多个；
     *
     * ④ topicPartitions：可配置更加详细的监听信息，可指定topic、parition、offset监听。
     *
     * 上面onMessage2监听的含义：监听topic1的0号分区，同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。
     *
     * 注意：topics和topicPartitions不能同时使用；
     * ————————————————
     * 版权声明：本文为CSDN博主「Felix-Yuan」的原创文章，遵循CC 4.0 BY-SA版权协议，转载请附上原文出处链接及本声明。
     * 原文链接：https://blog.csdn.net/yuanlong122716/article/details/105160545/
     * @param record
     */

    /**
     * 消费方法1
     */
//    @Async()
//    @KafkaListener(id = "consumer1",groupId = "consumer-group",topicPartitions = {
//            @TopicPartition(topic = "TopicA", partitions = { "0" }),
//           // @TopicPartition(topic = "topicB", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
//    })
//    public void processMessage(ConsumerRecord<?, ?> record){
//        Optional<?> consumer = Optional.ofNullable(record.value());
//        if(consumer.isPresent()){
//            log.info("接收到消息，线程id为：[{}]Topic为{}partition为{}offset为{}value为()",Thread.currentThread().getId(),record.topic(),record.partition(),record.offset(),record.value());
//        }
//    }

    /**
     * 消费方法2
     * @param records
     */
    @KafkaListener(id = "consumer2",groupId = "consumer-group", topics = "TopicA")
    public void onMessage3(List<ConsumerRecord<?, ?>> records) {
        List<ConsumerRecord<?, ?>> newLists = Optional.ofNullable(records).orElse(new ArrayList<>());
        System.out.println(">>>批量消费一次，records.size()="+records.size());
        for (ConsumerRecord<?, ?> record : newLists) {
            System.out.println(record.value());
        }
    }

//    @Async()
//    @KafkaListener( topics = {"TopicA"})
//    public void processMessage(String message){
//        log.info("接收到消息，线程id为：[{}]{}{}",Thread.currentThread().getId(),"TopicA",message);
//    }

}
